

比如说前端发起了多个请求,需求是按照顺序返回结果,比如说1,2,3,4,5。但我们都知道,ajax请求是异步的,不能确保返回结果的顺序,有可能返回的是1,3,2,5,4。这样就不满足我们的需求了。我可以怎么做呢?可以使用 async...await 按照顺序请求,或者Promise.all来请求。那么还有一种解决方案,就是rxjs。
async...await 是前端比较熟悉的处理方式,但是到了nestjs里面,使用async...await就不太合适了?为什么呢?因为同一段时间里面的数据量不同,前端里面1s内可能要求处理5个顺序请求,写成async...await没有问题,但是后端的1s里面,可能接收到几百上千个请求,请问这时候怎么做?一个一个把async...await写出来吗?肯定是不行的。所以就有了rxjs。
比如说我在nestjs里面看到的这段代码,曾经最让我难以想象的:统一处理返回结果的这段代码,response.interceptor.ts。
xxxxxxxxxx541import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common'2import { Observable } from 'rxjs'3import { map } from 'rxjs/operators'4import { ErrorCode } from '@/shared/constants/error.constants'56export interface Response {7 statusCode: number8 message: string9}1011@Injectable()12export class ResponseInterceptor implements NestInterceptor {13 intercept(context: ExecutionContext, next: CallHandler): Observable<any> {14 return next.handle().pipe(15 map((data) => {16 const newData = data as any1718 /**19 如果返回结果没有response属性,这个response属性异常拦截器会给予的,如果接口正常返回结果,就没有这个response属性。20 正常返回的newData是这样的: { rows: [], page: { total: 0, page: 1, limit: 10 } }2122 那么ErrorCode是什么呢?这是作者自定义的一些信息,打开这个文件你就知道了。23 */24 if (!newData?.response) {25 return {26 data: newData,27 code: ErrorCode.SUCCESS.CODE,28 msg: ErrorCode.SUCCESS.MESSAGE,29 }30 }3132 /**33 * 如果newData有response属性,那么肯定是哪个拦截器起作用了,ErrorCode是一个类,定义有HasCode方法,所以可以用这个方法来识别具体的错误类型。34 */35 let code: number, msg: string36 // ErrorCode只需要定义一些常见的错误,如果是常见的错误,就可以给出一些明确的提示37 if (ErrorCode.HasCode(newData.response.code)) {38 code = newData.response.code39 msg = newData.message || ErrorCode.CodeToMessage(code)40 } else {41 // 如果不是常见的错误,那么就统一返回Error这种错误类型。42 code = ErrorCode.ERROR.CODE43 msg = newData.message || ErrorCode.ERROR.MESSAGE44 }4546 return {47 code,48 msg,49 }50 }),51 )52 }53}54这段代码我初看很难理解,但是现在有了一点眉目了,我的理解:
我创建了一个ResponseInterceptor,它是实现了NestInterceptor的,实现了intercept这个方法。那为什么要实现而不是直接重新写一个呢?这是因为NestInterceptor本身提供了很多方法,特别是这里用到的intercept方法,我只需要实现intercept的逻辑,就可以实现异步顺序处理数据。
intercept有两个主要参数,context和next,context我很熟悉,就是上下文,但是这里的next,我是怎么都想不通,为什么这里是next参数,它是怎么来的。
在 RxJS 中,Observable 是核心概念,它代表了一个可以发送多个值的流。所以interceptor方法就是创建了一个可以发送多个值的流。
next.handle() 调用下一个拦截器或控制器方法,并返回一个 Observable 对象。
pipe 方法用于对 Observable 进行操作,这里使用 map 操作符对响应数据进行转换。
如果响应数据中没有 response 属性,表示接口正常返回,将响应数据封装在 data 字段中,并设置 code 为 ErrorCode.SUCCESS.CODE,msg 为 ErrorCode.SUCCESS.MESSAGE。
如果响应数据中有response属性,表示可能出现了错误。通过ErrorCode.HasCode方法判断错误码是否为常见错误码:
ErrorCode.ERROR.CODE,错误消息设置为 ErrorCode.ERROR.MESSAGE。next.handle() 调用下一个拦截器或控制器方法,并返回一个 Observable 对象。应该怎么理解?
在 NestJS 的拦截器机制里,
next.handle()是一个关键的调用,它用于控制请求处理流程的推进,并且返回一个Observable对象,下面从几个方面详细解释:拦截器的工作原理
在 NestJS 中,拦截器是一种特殊的组件,它可以在请求到达控制器方法之前或之后执行一些额外的逻辑。拦截器可以用于日志记录、性能监控、响应转换等多种场景。拦截器的工作流程可以简单描述为:请求进入应用程序 -> 经过一系列拦截器 -> 到达控制器方法 -> 控制器方法处理请求并返回响应 -> 响应再经过一系列拦截器 -> 返回给客户端。
next.handle()的作用
next.handle()是CallHandler实例的一个方法,它的主要作用是将请求传递给下一个处理程序。这个处理程序可以是下一个拦截器,也可以是最终的控制器方法,具体取决于当前拦截器在拦截器链中的位置。返回
Observable对象
next.handle()返回一个Observable对象,这是因为 NestJS 采用了 RxJS 来处理异步操作。Observable是 RxJS 中的一个核心概念,它表示一个可观察的数据流,可以用来处理异步事件序列。代码示例说明
以下是一个简单的示例,帮助你更好地理解
next.handle()的作用:xxxxxxxxxx181import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';2import { Observable } from 'rxjs';3import { map } from 'rxjs/operators';45@Injectable()6export class LoggingInterceptor implements NestInterceptor {7intercept(context: ExecutionContext, next: CallHandler): Observable<any> {8console.log('Before...'); // 在请求处理之前执行的逻辑910const now = Date.now();11return next.handle().pipe(12map(data => {13console.log(`After... ${Date.now() - now}ms`); // 在请求处理之后执行的逻辑14return data;15}),16);17}18}在这个示例中:
console.log('Before...'):在调用next.handle()之前执行,这部分代码会在请求到达下一个处理程序(下一个拦截器或控制器方法)之前执行。next.handle():将请求传递给下一个处理程序,并返回一个Observable对象。pipe(map(...)):使用 RxJS 的map操作符对Observable发出的值进行转换。在这个例子中,map操作符中的代码会在控制器方法处理完请求并返回响应之后执行。总结
next.handle()是 NestJS 拦截器中用于控制请求处理流程的关键方法,它将请求传递给下一个处理程序,并返回一个Observable对象,允许你使用 RxJS 的操作符对请求和响应进行进一步的处理。通过这种方式,你可以在请求处理的前后执行自定义的逻辑。
pipe()方法是干什么的?
在 RxJS(Reactive Extensions for JavaScript)中,
pipe()方法是一个非常重要的方法,它用于组合多个操作符(operators),以便对Observable发出的数据进行链式处理。下面详细介绍pipe()方法的作用、用法和示例。作用
Observable代表一个可观察的数据流,而操作符则是用于处理这个数据流的工具。pipe()方法允许你将多个操作符连接在一起,形成一个操作管道,数据会依次通过这些操作符进行处理,每个操作符都会对数据进行特定的转换或操作。这样可以使代码更加模块化、可维护,并且能够清晰地表达数据处理的逻辑。用法
pipe()方法接受任意数量的操作符作为参数,这些操作符会按照传递的顺序依次对Observable发出的数据进行处理。其基本语法如下:xxxxxxxxxx51observable.pipe(2operator1(),3operator2(),4// 可以添加更多操作符5);这里的
observable是一个Observable实例,operator1、operator2等是 RxJS 提供的各种操作符,例如map、filter、tap等。常见操作符及示例
以下是一些常见的操作符及其在
pipe()方法中的使用示例:1.
map操作符
map操作符用于对Observable发出的每个值进行转换。xxxxxxxxxx141import { of } from 'rxjs';2import { map } from 'rxjs/operators';34// 创建一个 Observable,发出 1、2、35const source = of(1, 2, 3);67// 使用 pipe() 方法和 map 操作符将每个值乘以 28const result = source.pipe(9map(value => value * 2)10);1112// 订阅结果 Observable 并打印每个值13result.subscribe(value => console.log(value));14// 输出: 2, 4, 62.
filter操作符
filter操作符用于过滤Observable发出的值,只允许满足特定条件的值通过。xxxxxxxxxx141import { of } from 'rxjs';2import { filter } from 'rxjs/operators';34// 创建一个 Observable,发出 1、2、3、4、55const source = of(1, 2, 3, 4, 5);67// 使用 pipe() 方法和 filter 操作符过滤出偶数8const result = source.pipe(9filter(value => value % 2 === 0)10);1112// 订阅结果 Observable 并打印每个值13result.subscribe(value => console.log(value));14// 输出: 2, 43. 组合多个操作符
可以在
pipe()方法中组合多个操作符,按照顺序依次对数据进行处理。xxxxxxxxxx161import { of } from 'rxjs';2import { map, filter } from 'rxjs/operators';34// 创建一个 Observable,发出 1、2、3、4、55const source = of(1, 2, 3, 4, 5);67// 使用 pipe() 方法组合 map 和 filter 操作符8// 先过滤出偶数,再将每个偶数乘以 109const result = source.pipe(10filter(value => value % 2 === 0),11map(value => value * 10)12);1314// 订阅结果 Observable 并打印每个值15result.subscribe(value => console.log(value));16// 输出: 20, 40在 NestJS 拦截器中的应用
在你之前提供的 NestJS 拦截器代码中,
pipe()方法用于对next.handle()返回的Observable进行处理:xxxxxxxxxx81return next.handle().pipe(2map((data) => {3// 对响应数据进行处理4return {5// 处理后的数据6};7})8);这里使用
map操作符对Observable发出的响应数据进行转换,将其封装成统一的格式。综上所述,
pipe()方法是 RxJS 中用于组合操作符、处理Observable数据流的重要工具,它使得数据处理逻辑更加清晰和模块化。
rxjs时创建了数据流,这种“流”的概念,我想我需要不断的体会。
我最难体会的就是订阅操作,照我的想法,订阅应该是把整个数据流拿到了,但是这种相反是错误的,不是吗?
既然是“拿到了”,那么之后别人想订阅这个数据流,该向谁订阅呢?是不是需要向拿到了这个数据流的对象来订阅,这样搞就非常复杂了,就没有了“流”的概念了。
所以针对这个数据流的概念,我问了一下:怎么样理解rxjs里面的数据流?
答:
在 RxJS(Reactive Extensions for JavaScript)里,数据流可理解为随时间产生的值的序列,就像水流一样持续流动并携带信息。以下从几个方面帮助你理解 RxJS 中的数据流:
1. 数据流的产生
- 数据源:数据流可以由多种方式产生。例如,用户的鼠标点击、键盘输入、网络请求的响应、定时器的计时等。这些数据源就像水龙头,一旦开启,就会产生一连串的数据。
- 创建 Observable:在 RxJS 中,
Observable(可观察对象)是对数据流的抽象表示。你可以使用Observable的构造函数或者一些创建操作符(如of、from、interval等)来创建一个数据流。xxxxxxxxxx41import { of } from 'rxjs';23// 创建一个包含1, 2, 3的数据流4const source = of(1, 2, 3);2. 数据流的流动
- 订阅(Subscription):数据流不会自动开始流动,需要有订阅者来订阅它。订阅者通过调用
Observable的subscribe方法来接收数据流中的值。当订阅发生时,数据流就开始流动,值会依次传递给订阅者。xxxxxxxxxx61import { of } from 'rxjs';23const source = of(1, 2, 3);4source.subscribe((value) => {5console.log(value);6});
- 时间维度:数据流中的值是随时间依次产生的。例如,使用
interval操作符创建的数据流会每隔一段时间产生一个新的值。xxxxxxxxxx71import { interval } from 'rxjs';23// 每隔1秒产生一个从0开始递增的值4const source = interval(1000);5source.subscribe((value) => {6console.log(value);7});3. 数据流的处理
- 操作符(Operators):RxJS 提供了丰富的操作符,用于对数据流进行处理和转换。操作符可以像管道一样连接在一起,形成一个处理链。例如,
map操作符可以对数据流中的每个值进行转换,filter操作符可以过滤掉不符合条件的值。xxxxxxxxxx121import { of } from 'rxjs';2import { map, filter } from 'rxjs/operators';34const source = of(1, 2, 3, 4, 5);5const result = source.pipe(6filter((value) => value % 2 === 0),7map((value) => value * 2)8);910result.subscribe((value) => {11console.log(value);12});4. 数据流的结束
- 完成信号:数据流可以有一个结束的信号。当数据流中的所有值都被发送完毕后,会发送一个
complete信号给订阅者,表示数据流已经结束。订阅者可以在subscribe方法中传入一个回调函数来处理这个完成信号。xxxxxxxxxx71import { of } from 'rxjs';23const source = of(1, 2, 3);4source.subscribe({5next: (value) => console.log(value),6complete: () => console.log('数据流结束')7});
- 错误处理:如果在数据流的产生或处理过程中发生了错误,会发送一个
error信号给订阅者。订阅者可以在subscribe方法中传入一个错误处理回调函数来处理这个错误。xxxxxxxxxx61import { throwError } from 'rxjs';23const source = throwError(new Error('发生错误'));4source.subscribe({5error: (err) => console.error('错误信息:', err.message)6});综上所述,RxJS 中的数据流是一种强大的抽象概念,它允许你以声明式的方式处理异步数据,使得代码更加简洁、可维护和可测试。


从事件这个概念提供的案例可以看到,前端、后端都可以使用rxjs。

这里有一个重要的概念:Schedulers。这个概念解决了我心中的一个疑惑:nestjs中为什么需要rxjs?
在学习nestjs的过程中,我学习了拦截器、控制器、过滤器、装饰器这些概念,由此引出了AOP的概念。但是为什么能够实现AOP?这个我是不懂的。那么rxjs中的Schedulers就解决了这个问题,是通过调度器来控制事件的并发情况的。



直接在https://rxjs.dev的F12里面运行已经不行了,它提示这样:

在这里来测试代码:https://stackblitz.com/edit/rxjs
上面的代码应该写成这样:
xxxxxxxxxx31import { interval, take } from 'rxjs';23interval(500).pipe(take(4)).subscribe(console.log);效果:




这里我获取了一个好消息,最常见的rxjs的API,都从rxjs或者rxjs/operators引入,红框中的四种位置都是蛮好记的。

下面是一个例子:

上面的代码,输出结果之后和老师的不一样:

也不知道是怎么回事。这是在stackblitz里面测试的,我直接在html文件里面测试,看会怎么样?
xxxxxxxxxx2312<html lang="en">3<head>4 <meta charset="UTF-8">5 <meta name="viewport" content="width=device-width, initial-scale=1.0">6 <title>Document</title>7 <script src="https://cdn.bootcdn.net/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>8</head>9<body>10 <div id="test">1112 </div>1314 <script type="module">15 const {fromEvent} = rxjs1617 const click$ = fromEvent(document,'click')1819 const subs = click$.subscribe({next : d => console.log(d)})2021 </script>22</body>23</html>查看效果是正常的:

上面这张图很重要,差不多就是rxjs的标准编写流程了。其中有一段代码很难理解:
click$是可观察的物件,而observer是观察者物件。难道不应该是观察者物件去订阅可观察的物件吗?不应该写成这样吗?
问:为什么不是observer.subscribe(click$)这种方式来调用呢?不应该是 观察者订阅Observable的变化吗?
答:
这是一个很好的问题,涉及到 RxJS 的核心概念和设计模式。让我们详细解释为什么在 RxJS 中使用
observable.subscribe(observer)而不是observer.subscribe(observable):1. Observable 和 Observer 的关系
在 RxJS 中,Observable 和 Observer 的关系类似于生产者和消费者的关系:
- Observable:是事件的生产者。它会在适当的时候发出(“推送”)事件或数据。
- Observer:是事件的消费者。它会定义如何处理从 Observable 发出的事件。
2. 订阅的方向
订阅机制决定了谁来控制数据流动:
- Observable.subscribe(observer):这表示 “订阅者(observer)订阅生产者(observable)”。在这段代码中,Observable 负责发出事件,而 Observer 则定义了如何处理这些事件。
- observer.subscribe(observable):如果按照这种方式调用,那就意味着“生产者”反过来去订阅“消费者”的变化,这违背了 Observable 模式的本意。
3. 为何设计为
observable.subscribe(observer)设计 RxJS 为
observable.subscribe(observer)的方式有几个原因:
- 符合事件的自然流向:事件是由 Observable 发出的,所以 Observable 自然地“拥有”这个流动控制权,而 Observer 是去“订阅”这个流。将
subscribe方法定义在 Observable 上,更符合事件从源头发出到被处理的流向。- 职责分离:Observable 和 Observer 分别承担不同的职责。Observable 负责产生数据(事件),Observer 负责处理数据(反应)。这种设计保持了单一职责原则,清晰地分离了生产和消费的逻辑。
- 便于链式操作:RxJS 的强大之处在于能够以链式的方式操作数据流。例如,通过
observable.pipe(...).subscribe(observer),你可以在订阅之前对数据进行各种操作(例如过滤、映射、合并等)。这样的链式设计提高了代码的可读性和可维护性。4. 举个例子
想象一个电台广播和听众的例子:
- 电台广播(Observable)负责发出广播信号。
- 听众(Observer)可以选择是否收听(订阅)这些广播。
在这个比喻中,电台不会去主动寻找听众并“订阅”他们,而是听众决定去收听电台的广播。这和
observable.subscribe(observer)的设计是类似的。总结
- Observable.subscribe(observer) 是 RxJS 中的标准模式,符合事件从生产者到消费者的自然流向。
- 这种设计方式便于维护和扩展,也更符合大多数编程语言和框架中的事件处理模式。
通过这种设计,RxJS 保持了良好的模块化和灵活性,使得开发者能够以一种声明式和易于理解的方式处理复杂的异步数据流。
https://rxjs.dev/guide/operators
在这个地址中,有所有的operators的列表,完整的列表可以查看references page。

下面是一个过滤的例子:

xxxxxxxxxx3412<html lang="en">3<head>4 <meta charset="UTF-8">5 <meta name="viewport" content="width=device-width, initial-scale=1.0">6 <title>Document</title>7 <script src="https://cdn.bootcdn.net/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>8 <style>9 body{10 margin: 0;11 padding: 0;12 }13 #test{14 width: 100vw;15 height: 100vh;16 }17 </style>18</head>19<body>20 <div id="test">21 <div style="width: 100px;height: 100%;background-color: lightblue;">22 </div>23 </div>2425 <script type="module">26 const {fromEvent} = rxjs27 const {filter} = rxjs.operators2829 const click$ = fromEvent(document,'click')3031 const subs = click$.pipe(filter(x => x.clientX < 100)).subscribe({next : d => console.log(d)})32 </script>33</body>34</html>查看效果:


xxxxxxxxxx3812<html lang="en">34<head>5 <meta charset="UTF-8">6 <meta name="viewport" content="width=device-width, initial-scale=1.0">7 <title>Document</title>8 <script src="https://cdn.bootcdn.net/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>9 <style>10 body {11 margin: 0;12 padding: 0;13 }1415 #test {16 width: 100vw;17 height: 100vh;18 }19 </style>20</head>2122<body>23 <div id="test">24 <div style="width: 100px;height: 100%;background-color: lightblue;">25 </div>26 </div>2728 <script type="module">29 const { fromEvent } = rxjs30 const { filter, take } = rxjs.operators3132 const click$ = fromEvent(document, 'click')3334 const subs = click$.pipe(filter(x => x.clientX < 100), take(4)).subscribe({ next: d => console.log(d) })35 </script>36</body>3738</html>查看效果:

注意我的鼠标,我的鼠标每移动一次位置,我都会点击一次,我总共移动了8次,为什么只输出4次呢?这四次是哪四次呢?首先需要符合filter的条件,那么前2次就没有输出,接着4次符合filter条件,并且take递增,所以有4次输出,最后2次因为不符合take条件了,所以没有输出。
在之前的例子中,我们都是定义一个观察者物件,如下图:

但是在下面,我们将先创建一个Subject主体物件,然后再由这个主体物件去创建观察者物件并订阅。

上面这个例子是什么意思呢?可以理解为中介者模式,这里的subject相当于一个中介者(同时是Observable和Observer),当有多个观察者来订阅Observable时,通过使用subject来订阅Observable,然后其余的观察者订阅subject。比如说多个观察者订阅x.clientX,可以让Observable只发出一次消息即可,真正的广播动作交给subject来完成,这样做的好处就是Observable可以尽可能保持简单。
xxxxxxxxxx2812<html lang="en">3<head>4 <meta charset="UTF-8">5 <meta name="viewport" content="width=device-width, initial-scale=1.0">6 <title>Document</title>7 <script src="https://cdn.bootcdn.net/ajax/libs/rxjs/7.8.1/rxjs.umd.min.js"></script>8</head>9<body>10 <div id="test">1112 </div>1314 <script type="module">15 const {fromEvent,Subject} = rxjs1617 const subject = new Subject()1819 const click$ = fromEvent(document,'click')2021 click$.subscribe(subject)2223 const subs1$ = subject.subscribe(d => console.log(d))24 const subs2$ = subject.subscribe(d => console.log(d))2526 </script>27</body>28</html>

这张图怎么看呢?横向可以看成是时间,比如说上面这个可以看成是要或者待执行的代码(或者说数据流)的顺序,尾部的竖线表示结束。

那么经过take(2)处理之后,尾部的竖线就到了b上面,表示只执行2个就结束了。表示真实执行的代码(或者说数据流)的顺序。


上面的1、2、3是待处理的数据流,经过map(x => 10 * x)处理之后,就变为了10、20、30。

concat是用来串接两个Observable对象。a、b和x、y本来是属于不同的Observable的,但是经过concat处理之后,可以变为一个Observable。

这个网站太棒了,对于理解rxjs真的很有帮助。大部分的API可以通过移动元素来交互感受。

https://reactive.how/rxjs/explorer



如何选择需要的operator呢?在官网的这里可以交互式的选择:

操作示例:

最后得到了fromEvent这个API,可以供参考。











因为rxjs有很多API的名称和js的数组或者对象的方法有重名,所以不要搞混了。比如说rxjs里面的map、concat方法,不要和js的数组的方法搞混了。既然是用在rxjs上面的,肯定就是rxjs的map、concat方法。